
package org.apache.seatunnel.connectors.seatunnel.gaussdbcassandra.Client;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.gaussdbcassandra.exception.GaussDBConnectorException;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

public class GaussDBClient {

    public static CqlSessionBuilder getCqlSessionBuilder(String nodeAddress, String keyspace, String username,
                                                         String password, String dataCenter) {
        List<CqlSessionBuilder> cqlSessionBuilderList = Arrays.stream(nodeAddress.split(",")).map(address -> {
            String[] nodeAndPort = address.split(":", 2);
            if (StringUtils.isEmpty(username) && StringUtils.isEmpty(password)) {
                return CqlSession.builder()
                        .addContactPoint(new InetSocketAddress(nodeAndPort[0], Integer.parseInt(nodeAndPort[1])))
                        .withKeyspace(keyspace).withLocalDatacenter(dataCenter);
            }
            return CqlSession.builder()
                    .addContactPoint(new InetSocketAddress(nodeAndPort[0], Integer.parseInt(nodeAndPort[1])))
                    .withAuthCredentials(username, password).withKeyspace(keyspace).withLocalDatacenter(dataCenter);
        }).collect(Collectors.toList());
        return cqlSessionBuilderList.get(ThreadLocalRandom.current().nextInt(cqlSessionBuilderList.size()));
    }

    // Test that the connection works
    public static SimpleStatement createSimpleStatement(String cql, ConsistencyLevel consistencyLevel) {
        return SimpleStatement.builder(cql).setConsistencyLevel(consistencyLevel).build();
    }

    public static ColumnDefinitions getTableSchema(CqlSession session, String table) {
        try {
            return session.execute(String.format("select * from %s limit 1", table)).getColumnDefinitions();
        } catch (GaussDBConnectorException e) {
            throw new GaussDBConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
                    "Cannot get table schema from cassandra", e);
        }
    }
}
